rkhachatryan commented on a change in pull request #15420:
URL: https://github.com/apache/flink/pull/15420#discussion_r657777451
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -369,7 +509,73 @@ public void notifyCheckpointAborted(long checkpointId)
throws Exception {
// Factory function interface
private interface StateFactory {
<K, N, SV, S extends State, IS extends S> IS create(
- InternalKvState<K, N, SV> kvState, KvStateChangeLogger<SV, N>
changeLogger)
+ InternalKvState<K, N, SV> kvState,
+ KvStateChangeLogger<SV, N> changeLogger,
+ InternalKeyContext<K> keyContext)
throws Exception;
}
+
+ /**
+ * @param name state name
+ * @param type state type (the only supported type currently are: {@link
+ * BackendStateType#KEY_VALUE key value}, {@link
BackendStateType#PRIORITY_QUEUE priority
+ * queue})
+ * @return an existing state, i.e. the one that was already created
+ * @throws NoSuchElementException if the state wasn't created
+ * @throws UnsupportedOperationException if state type is not supported
+ */
+ public ChangelogState getExistingState(String name, BackendStateType type)
+ throws NoSuchElementException, UnsupportedOperationException {
+ ChangelogState state;
+ switch (type) {
+ case KEY_VALUE:
+ state = (ChangelogState) keyValueStatesByName.get(name);
Review comment:
> I think we should not call changelogBackend.getOrCreateKeyedState() in
step2 to put states within delegated backend.
But how will we apply state changes then? When applying changelog, delegated
backend **has** to have internal states created.
Do you mean we should call some other method but still have internal states
created in step 2?
> We would not let delegated backend to call getOrCreateKeyedState during
restore.
`delegatedBackend.getOrCreateKeyedState` is actually never called. Only
`delegatedBackend.createInternalState` is called.
> And I think delegating state descritptor which includes serializer and
function could help here.
In the end, internal states have `final` references to serializers. So at
least serializers should be delegating.
I think state migration deserves a separate ticket and a PR, which can be
applied after this one, WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]