rkhachatryan commented on code in PR #19679: URL: https://github.com/apache/flink/pull/19679#discussion_r918295754
########## flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java: ########## @@ -163,6 +170,32 @@ KeyGroupedInternalPriorityQueue<T> create( return priorityQueuesManager.createOrUpdate(stateName, byteOrderedElementSerializer); } + @Override + public <N, SV, SEV, S extends State, IS extends S> IS upgradeKeyedState( + TypeSerializer<N> namespaceSerializer, + StateDescriptor<S, SV> stateDescriptor, + @Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory) + throws Exception { + Preconditions.checkState(createdKVStates.containsKey(stateDescriptor.getName())); + registeredKVStates.computeIfPresent( + stateDescriptor.getName(), + (stateName, stateTable) -> { + stateTable.setMetaInfo( + new RegisteredKeyValueStateBackendMetaInfo<>( + stateTable.getMetaInfo().snapshot())); + return stateTable; Review Comment: 1. I think we also need to update `StateMap`s serializers, don't we? 2. The purpose of re-creating `RegisteredKeyValueStateBackendMetaInfo` isn't obvious (it is using a different `StateSerializerProvider`, right?). I think it deserves a comment, WDYT? ########## flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java: ########## @@ -565,18 +568,44 @@ public <N, SV, SEV, S extends State, IS extends S> IS createInternalState( StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception { + ChangelogState changelogState = + changelogStateFactory.getExistingState( + stateDesc.getName(), BackendStateType.KEY_VALUE); + if (changelogState == null) { + InternalKvState<K, N, SV> state = + keyedStateBackend.createInternalState( + namespaceSerializer, stateDesc, snapshotTransformFactory); + + changelogState = + changelogStateFactory.create( + stateDesc, + state, + getKvStateChangeLogger(state, stateDesc, snapshotTransformFactory), + keyedStateBackend /* pass the nested backend as key context so that it get key updates on recovery*/); + } else { + InternalKvState<K, N, SV> state = + keyedStateBackend.upgradeKeyedState( + namespaceSerializer, stateDesc, snapshotTransformFactory); + changelogState.setDelegatedState(state); Review Comment: Is it only the delegated state that needs to be upgraded? I think at least the serializer inside the logger needs to be upgraded as well. ditto: PQ state ########## flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java: ########## @@ -565,18 +568,44 @@ public <N, SV, SEV, S extends State, IS extends S> IS createInternalState( StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception { + ChangelogState changelogState = + changelogStateFactory.getExistingState( + stateDesc.getName(), BackendStateType.KEY_VALUE); + if (changelogState == null) { Review Comment: I'm wondering about the "normal" case - i.e. with both materialized and non-materialized state: 1. The internal backend might receive serializers from its initial snapshot 2. when first reading the `METADATA` record from changelog, `changelogState` will be null` 3. so `keyedStateBackend.upgradeKeyedState` will not be called Or am I missing something? ditto: PQ state -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org