rkhachatryan commented on code in PR #19679:
URL: https://github.com/apache/flink/pull/19679#discussion_r918295754


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java:
##########
@@ -163,6 +170,32 @@ KeyGroupedInternalPriorityQueue<T> create(
         return priorityQueuesManager.createOrUpdate(stateName, 
byteOrderedElementSerializer);
     }
 
+    @Override
+    public <N, SV, SEV, S extends State, IS extends S> IS upgradeKeyedState(
+            TypeSerializer<N> namespaceSerializer,
+            StateDescriptor<S, SV> stateDescriptor,
+            @Nonnull StateSnapshotTransformFactory<SEV> 
snapshotTransformFactory)
+            throws Exception {
+        
Preconditions.checkState(createdKVStates.containsKey(stateDescriptor.getName()));
+        registeredKVStates.computeIfPresent(
+                stateDescriptor.getName(),
+                (stateName, stateTable) -> {
+                    stateTable.setMetaInfo(
+                            new RegisteredKeyValueStateBackendMetaInfo<>(
+                                    stateTable.getMetaInfo().snapshot()));
+                    return stateTable;

Review Comment:
   1. I think we also need to update `StateMap`s serializers, don't we?
   2. The purpose of re-creating `RegisteredKeyValueStateBackendMetaInfo` isn't 
obvious (it is using a different `StateSerializerProvider`, right?). I think it 
deserves a comment, WDYT?



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##########
@@ -565,18 +568,44 @@ public <N, SV, SEV, S extends State, IS extends S> IS 
createInternalState(
                     StateSnapshotTransformer.StateSnapshotTransformFactory<SEV>
                             snapshotTransformFactory)
             throws Exception {
+        ChangelogState changelogState =
+                changelogStateFactory.getExistingState(
+                        stateDesc.getName(), BackendStateType.KEY_VALUE);
+        if (changelogState == null) {
+            InternalKvState<K, N, SV> state =
+                    keyedStateBackend.createInternalState(
+                            namespaceSerializer, stateDesc, 
snapshotTransformFactory);
+
+            changelogState =
+                    changelogStateFactory.create(
+                            stateDesc,
+                            state,
+                            getKvStateChangeLogger(state, stateDesc, 
snapshotTransformFactory),
+                            keyedStateBackend /* pass the nested backend as 
key context so that it get key updates on recovery*/);
+        } else {
+            InternalKvState<K, N, SV> state =
+                    keyedStateBackend.upgradeKeyedState(
+                            namespaceSerializer, stateDesc, 
snapshotTransformFactory);
+            changelogState.setDelegatedState(state);

Review Comment:
   Is it only the delegated state that needs to be upgraded?
   I think at least the serializer inside the logger needs to be upgraded as 
well.
   
   ditto: PQ state



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##########
@@ -565,18 +568,44 @@ public <N, SV, SEV, S extends State, IS extends S> IS 
createInternalState(
                     StateSnapshotTransformer.StateSnapshotTransformFactory<SEV>
                             snapshotTransformFactory)
             throws Exception {
+        ChangelogState changelogState =
+                changelogStateFactory.getExistingState(
+                        stateDesc.getName(), BackendStateType.KEY_VALUE);
+        if (changelogState == null) {

Review Comment:
   I'm wondering about the "normal" case - i.e. with both materialized and 
non-materialized state:
   1. The internal backend might receive serializers from its initial snapshot
   2. when first reading the `METADATA` record from changelog, `changelogState` 
will be null` 
   3. so `keyedStateBackend.upgradeKeyedState` will not be called
   
   Or am I missing something?
   
   ditto: PQ state



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to