rkhachatryan commented on a change in pull request #15420:
URL: https://github.com/apache/flink/pull/15420#discussion_r657536620



##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -369,7 +509,73 @@ public void notifyCheckpointAborted(long checkpointId) 
throws Exception {
     // Factory function interface
     private interface StateFactory {
         <K, N, SV, S extends State, IS extends S> IS create(
-                InternalKvState<K, N, SV> kvState, KvStateChangeLogger<SV, N> 
changeLogger)
+                InternalKvState<K, N, SV> kvState,
+                KvStateChangeLogger<SV, N> changeLogger,
+                InternalKeyContext<K> keyContext)
                 throws Exception;
     }
+
+    /**
+     * @param name state name
+     * @param type state type (the only supported type currently are: {@link
+     *     BackendStateType#KEY_VALUE key value}, {@link 
BackendStateType#PRIORITY_QUEUE priority
+     *     queue})
+     * @return an existing state, i.e. the one that was already created
+     * @throws NoSuchElementException if the state wasn't created
+     * @throws UnsupportedOperationException if state type is not supported
+     */
+    public ChangelogState getExistingState(String name, BackendStateType type)
+            throws NoSuchElementException, UnsupportedOperationException {
+        ChangelogState state;
+        switch (type) {
+            case KEY_VALUE:
+                state = (ChangelogState) keyValueStatesByName.get(name);

Review comment:
       `ChangelogKeyedStateBackend` differs a bit from other backends here. On 
recovery,
   1. delegated backends are created as usual (with metadata `M1`)
   1. `ChangelogBackendLogApplier` reads metadata `M2` in `restoreKvMetaData()` 
and calls `changelogBackend.getOrCreateKeyedState()` (`M2` replaces `M1` in 
delegated backend)
   1. After that, `ChangelogBackendLogApplier.applyDataChange()` calls 
`changelogBackend.getExistingState()` to apply data changes
   1. User program may call `changelogBackend.getOrCreateKeyedState()` with 
metadata `M3`
   
   So `changelogBackend.keyValueStatesByName` **is** updated on recovery (in 
step 2).
   
   ---
   
   However, state migration won't currently work as you pointed out.
   
   I see the only way to implement - it by using delegating metadata and 
serializers, similar to delegating functions in this PR:
   - on step 2, wrap obtained serializers in 
`changelogBackend.createInternalState()` and pass this to the `delegatedBackend`
   - on step 4, upgrade metadata and replace the wrapped serializers with the 
upgraded ones
   
   So the metadata of the delegated backend won't be upgraded explicitly, but 
effictively it will start using the new serializers.
   
   WDYT? (it's probably easier to discuss it offline)
   
   Other approaches don't seem to work:
   - If we simply call `delegatedBackend.createInternalState` to upgrade 
metadata, then already created states won't receive the updated serializers
   - If we replace existing internal states with new ones, we can end up with 
resource leaks (and potentially non-flushed changes)
   - (we do need to create internal delegated states during recovery, so that 
we can apply changes by calling `put`/`add`/`clear` on them)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to