rkhachatryan commented on a change in pull request #15420:
URL: https://github.com/apache/flink/pull/15420#discussion_r657536620
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -369,7 +509,73 @@ public void notifyCheckpointAborted(long checkpointId)
throws Exception {
// Factory function interface
private interface StateFactory {
<K, N, SV, S extends State, IS extends S> IS create(
- InternalKvState<K, N, SV> kvState, KvStateChangeLogger<SV, N>
changeLogger)
+ InternalKvState<K, N, SV> kvState,
+ KvStateChangeLogger<SV, N> changeLogger,
+ InternalKeyContext<K> keyContext)
throws Exception;
}
+
+ /**
+ * @param name state name
+ * @param type state type (the only supported type currently are: {@link
+ * BackendStateType#KEY_VALUE key value}, {@link
BackendStateType#PRIORITY_QUEUE priority
+ * queue})
+ * @return an existing state, i.e. the one that was already created
+ * @throws NoSuchElementException if the state wasn't created
+ * @throws UnsupportedOperationException if state type is not supported
+ */
+ public ChangelogState getExistingState(String name, BackendStateType type)
+ throws NoSuchElementException, UnsupportedOperationException {
+ ChangelogState state;
+ switch (type) {
+ case KEY_VALUE:
+ state = (ChangelogState) keyValueStatesByName.get(name);
Review comment:
`ChangelogKeyedStateBackend` differs a bit from other backends here. On
recovery,
1. delegated backends are created as usual (with metadata `M1`)
1. `ChangelogBackendLogApplier` reads metadata `M2` in `restoreKvMetaData()`
and calls `changelogBackend.getOrCreateKeyedState()` (`M2` replaces `M1` in
delegated backend)
1. After that, `ChangelogBackendLogApplier.applyDataChange()` calls
`changelogBackend.getExistingState()` to apply data changes
1. User program may call `changelogBackend.getOrCreateKeyedState()` with
metadata `M3`
So `changelogBackend.keyValueStatesByName` **is** updated on recovery (in
step 2).
---
However, state migration won't currently work as you pointed out.
I see the only way to implement it - by using delegating serializers,
similar to delegating functions from this PR:
- on step 2, wrap obtained serializers in
`changelogBackend.createInternalState()` before passing them to the
`delegatedBackend`; store wrapping objects mapped by state name
- on step 4, lookup wrappers and upgrade their wrapped serializers
So the metadata of the delegated backend won't be upgraded explicitly, but
effictively it will start using the new serializers.
WDYT? (it's probably easier to discuss it offline)
Other approaches don't seem to work:
- If we simply call `delegatedBackend.createInternalState` to upgrade
metadata, then already created states won't receive the updated serializers
- If we replace existing internal states with new ones, we can end up with
resource leaks (and potentially non-flushed changes)
- (we do need to create internal delegated states during recovery, so that
we can apply changes by calling `put`/`add`/`clear` on them)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]